package com.ruyuan.eshop.promotion.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruyuan.eshop.common.constants.RedisKey;
import com.ruyuan.eshop.common.constants.RocketMqConstant;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.enums.CouponSendTypeEnum;
import com.ruyuan.eshop.common.enums.YesOrNoEnum;
import com.ruyuan.eshop.common.exception.BaseBizException;
import com.ruyuan.eshop.common.message.PlatformCouponUserBucketMessage;
import com.ruyuan.eshop.common.utils.JsonUtil;
import com.ruyuan.eshop.membership.api.AccountApi;
import com.ruyuan.eshop.promotion.converter.CouponConverter;
import com.ruyuan.eshop.promotion.dao.SalesPromotionCouponDAO;
import com.ruyuan.eshop.promotion.dao.SalesPromotionCouponItemDAO;
import com.ruyuan.eshop.promotion.domain.dto.ReceiveCouponDTO;
import com.ruyuan.eshop.promotion.domain.dto.SaveOrUpdateCouponDTO;
import com.ruyuan.eshop.promotion.domain.dto.SendCouponDTO;
import com.ruyuan.eshop.promotion.domain.entity.SalesPromotionCouponDO;
import com.ruyuan.eshop.promotion.domain.entity.SalesPromotionCouponItemDO;
import com.ruyuan.eshop.promotion.domain.request.ReceiveCouponRequest;
import com.ruyuan.eshop.promotion.domain.request.SaveOrUpdateCouponRequest;
import com.ruyuan.eshop.promotion.domain.request.SendCouponRequest;
import com.ruyuan.eshop.promotion.enums.CouponStatusEnum;
import com.ruyuan.eshop.promotion.enums.CouponTypeEnum;
import com.ruyuan.eshop.promotion.mq.producer.DefaultProducer;
import com.ruyuan.eshop.promotion.redis.RedisCache;
import com.ruyuan.eshop.promotion.service.CouponService;
import com.ruyuan.eshop.push.api.MessagePushApi;
import com.ruyuan.eshop.push.domain.dto.SaveOrUpdateMessageDTO;
import com.ruyuan.eshop.push.domain.request.SaveOrUpdateMessageRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dubbo.config.annotation.DubboReference;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
 * 优惠券接口实现
 *
 * @author zhonghuashishan
 */
@Slf4j
@Service
public class CouponServiceImpl implements CouponService {

    /**
     * 开启优惠券活动DAO
     */
    @Autowired
    private SalesPromotionCouponDAO salesPromotionCouponDAO;

    /**
     * 开启优惠券活动DAO
     */
    @Autowired
    private SalesPromotionCouponItemDAO salesPromotionCouponItemDAO;

    /**
     * rocketmq生产者
     */
    @Resource
    private DefaultProducer defaultProducer;

    /**
     * 账户服务
     */
    @DubboReference(version = "1.0.0")
    private AccountApi accountApi;

    /**
     * 商品服务
     */
    @DubboReference(version = "1.0.0")
    private MessagePushApi messagePushApi;

    /**
     * entity转换工具
     */
    @Autowired
    private CouponConverter couponConverter;

    /**
     * redis客户端工具
     */
    @Autowired
    private RedisCache redisCache;
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 保存/修改优惠券活动方法
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {
        SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request);
        couponDO.setCouponReceivedCount(0);
        salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);

        // 判断优惠券类型，系统发放类型，写到redis中，然后用户登录触发领取优惠券
        if(CouponSendTypeEnum.PLATFORM_SEND.getCode().equals(request.getCouponReceiveType())){
//            sendPlatformCouponMessage(couponDO);
            writeCouponToRedis(couponDO);
        }else {// SELF_RECEIVE 自己领取类的优惠券，则设置一个缓存，在用户登录的时候，自己领取（注：因为是全平台可领，所以没有数量限制）
            List<SalesPromotionCouponDO> promotionCouponDOS = new ArrayList<>();
            RLock lock = redissonClient.getLock(RedisKey.PROMOTION_COUPON__LOCK_KEY);
            try {
                lock.lock(60, TimeUnit.SECONDS);
                String promotions = redisCache.get(RedisKey.PROMOTION_COUPON_KEY);
                if(StringUtils.isNotBlank(promotions)){
                    promotionCouponDOS = JsonUtil.json2Object(promotions,List.class);
                }
                // 判断优惠是否过期，并清理过期优惠券
                promotionCouponDOS = validatePromotionEnd(promotionCouponDOS);
                promotionCouponDOS.add(couponDO);
                String promotionCouponDOSRedis = JsonUtil.object2Json(promotionCouponDOS);

                // 设置优惠券缓存，默认30天过期，具体时长设置要看公司活动的情况
                // 也可以设置为永不过期，每一次开启新活动的时候，都会把过期活动清理掉
                redisCache.set(RedisKey.PROMOTION_COUPON_KEY, promotionCouponDOSRedis,30 * 24 * 60 * 60);
            } finally {
                lock.unlock();
            }
        }

        SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO();
        dto.setCouponName(request.getCouponName());
        dto.setRule(request.getCouponRule());
        dto.setSuccess(true);
        return dto;
    }

    private void writeCouponToRedis(SalesPromotionCouponDO coupon) {
        // 我们需要用redisson基于redis做一个分布式锁的加锁，再去维护一个数据结构
        // 我们一共发出去了多少个券
        RLock lock = redissonClient.getLock(RedisKey.PROMOTION_COUPON_ID_LIST_LOCK);
        try {
            lock.lock(60, TimeUnit.SECONDS);

            List<Long> couponIds = null;

            String couponIdsJSON = redisCache.get(RedisKey.PROMOTION_COUPON_ID_LIST);
            if(couponIdsJSON == null || couponIdsJSON.equals("")) {
                couponIds = new ArrayList<>();
            } else {
                couponIds = JSON.parseObject(couponIdsJSON, List.class);
            }

            // 检查每个优惠券，时间是否过期了，如果过期了，或者说已经发完了券，把他从list里删除，以及从redis里做一个删除
            // 如果是全量发券，没有说所谓的发完，他可以给所有人发，如果超过了时间，就不能发券了
            if(couponIds.size() > 0) {
                Iterator<Long> couponIdIterator = couponIds.iterator();
                while(couponIdIterator.hasNext()) {
                    Long tempCouponId = couponIdIterator.next();
                    String tempCouponJSON = redisCache.get(RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
                    SalesPromotionCouponDO tempCoupon = JSON.parseObject(tempCouponJSON, SalesPromotionCouponDO.class);

                    Date now = new Date();
                    if(now.after(tempCoupon.getActivityEndTime())) {
                        couponIdIterator.remove();
                        redisCache.delete(RedisKey.PROMOTION_COUPON_KEY + "::" + tempCouponId);
                    }
                }
            }

            couponIds.add(coupon.getId());
            couponIdsJSON = JsonUtil.object2Json(couponIds);
            redisCache.set(RedisKey.PROMOTION_COUPON_ID_LIST, couponIdsJSON, -1);

            String couponJSON = JsonUtil.object2Json(coupon);
            redisCache.set(RedisKey.PROMOTION_COUPON_KEY + "::" + coupon.getId(), couponJSON, -1);
        } finally {
            lock.unlock();
        }
    }

    /**
     * 判断是否存在过期优惠券
     * @param couponDOS
     * @return
     */
    private List<SalesPromotionCouponDO> validatePromotionEnd(List<SalesPromotionCouponDO> couponDOS){
        // 重新搞一个list作为返回值
        // 这么做的目的是，尽量避免上下游方法对同一个list进行修改后直接返回
        // 尤其是遇到类似于list.subList这种方式，会造成数据问题
        /**
         *  public List<E> subList(int fromIndex, int toIndex) {
         *         subListRangeCheck(fromIndex, toIndex, size);
         *         return new SubList(this, 0, fromIndex, toIndex);
         *     }
         */
        List result = new ArrayList();
        for (SalesPromotionCouponDO couponDO:couponDOS) {
            if(couponDO.getActivityEndTime().before(new Date()) ||
                    !Objects.equals(couponDO.getCouponStatus(),
                            CouponStatusEnum.NORMAL.getCode())){
                continue;
            }
            result.add(couponDO);
        }
        return result;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public ReceiveCouponDTO receiveCoupon(ReceiveCouponRequest receiveCouponRequest) {
        // 获取优惠券信息
        SalesPromotionCouponDO couponDO = salesPromotionCouponDAO.getById(receiveCouponRequest.getCouponId());

        // 检查优惠券状态
        ReceiveCouponDTO dto = checkCouponStatus(couponDO);
        if (!dto.getSuccess()){
            return dto;
        }

        // 查询用户是否已经领取过该优惠券了，如果领取过，直接返回
        LambdaQueryWrapper<SalesPromotionCouponItemDO> queryWrapper = Wrappers.lambdaQuery();
        queryWrapper.eq(SalesPromotionCouponItemDO::getCouponId, receiveCouponRequest.getCouponId());
        queryWrapper.eq(SalesPromotionCouponItemDO::getUserAccountId, receiveCouponRequest.getUserAccountId());
        int count = salesPromotionCouponItemDAO.count(queryWrapper);
        // 用户已经领取过该优惠券
        if (count > 0){
            dto.setSuccess(false);
            dto.setMessage("已经领取过该优惠券，不要重复领取哦");
            return dto;
        }

        // 修改优惠券领取数量
        couponDO.setCouponReceivedCount(couponDO.getCouponReceivedCount() + 1);
        // 如果领取数量与发放数量相同，将优惠券状态设置为发放完
        if (Objects.equals(couponDO.getCouponCount(), couponDO.getCouponReceivedCount())){
            couponDO.setCouponStatus(CouponStatusEnum.USED.getCode());
        }
        salesPromotionCouponDAO.updateById(couponDO);

        // 领取一张优惠券
        SalesPromotionCouponItemDO couponItemDO =
                buildSalesPromotionCouponItemDO(couponDO, receiveCouponRequest.getUserAccountId());

        // 添加一条领取记录
        salesPromotionCouponItemDAO.save(couponItemDO);

        return dto;
    }

    private SalesPromotionCouponItemDO buildSalesPromotionCouponItemDO(SalesPromotionCouponDO couponDO,Long accountId) {
        SalesPromotionCouponItemDO couponItemDO = SalesPromotionCouponItemDO.builder()
                .couponId(couponDO.getId())
                .userAccountId(accountId)
                .couponType(couponDO.getCouponType())
                .isUsed(YesOrNoEnum.NO.getCode())
                .activityStartTime(couponDO.getActivityStartTime())
                .activityEndTime(couponDO.getActivityEndTime())
                .createUser(couponDO.getCreateUser())
                .build();
        return couponItemDO;
    }

    /**
     * 检查优惠券的状态，并返回 领取优惠券结果 对象
     *
     * @param couponDO
     * @return
     */
    private ReceiveCouponDTO checkCouponStatus(SalesPromotionCouponDO couponDO) {
        if (Objects.isNull(couponDO)) {
            throw new BaseBizException("优惠券不存在");
        }

        ReceiveCouponDTO dto = new ReceiveCouponDTO();
        Integer couponStatus = couponDO.getCouponStatus();

        // 领取完或已过期
        if (!Objects.equals(couponStatus, CouponStatusEnum.NORMAL.getCode())) {
            dto.setSuccess(false);
            CouponStatusEnum statusEnum = CouponStatusEnum.getByCode(couponStatus);
            if (Objects.isNull(statusEnum)){
                throw new BaseBizException("优惠券领取失败");
            }
            dto.setMessage("优惠券"+ statusEnum.getMsg() + "，下次早点来哦");
            return dto;
        }
        // 发行数量小于或者等于领取数量，优惠券已经领取完
        if (couponDO.getCouponCount() <= couponDO.getCouponReceivedCount()) {
            // 修改coupon
            couponDO.setCouponStatus(CouponStatusEnum.USED.getCode());
            salesPromotionCouponDAO.updateById(couponDO);

            dto.setSuccess(false);
            dto.setMessage("优惠券已发放完，下次早点来哦");
            return dto;
        }

        // 优惠券过期
        if (couponDO.getActivityEndTime().before(new Date())) {
            // 修改coupon
            couponDO.setCouponStatus(CouponStatusEnum.EXPIRED.getCode());
            salesPromotionCouponDAO.updateById(couponDO);

            dto.setSuccess(false);
            dto.setMessage("优惠券已过期，下次早点来哦");
            return dto;
        }

        dto.setSuccess(true);
        return dto;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public SendCouponDTO sendCoupon(SendCouponRequest sendCouponRequest) {
        return sendCouponByConditions(sendCouponRequest);
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public SendCouponDTO sendCouponByConditions(SendCouponRequest sendCouponRequest) {
        // 保存优惠券信息
        SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(sendCouponRequest);
        couponDO.setCouponReceivedCount(0);
        couponDO.setCouponStatus(CouponStatusEnum.NORMAL.getCode());
        couponDO.setCouponReceiveType(CouponSendTypeEnum.SELF_RECEIVE.getCode());
        salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);

        // 构建 messageRequest
        SaveOrUpdateMessageRequest messageRequest = buildSaveOrUpdateMessageRequest(sendCouponRequest);
        // 创建消息推送
        JsonResult<SaveOrUpdateMessageDTO> messageResult = messagePushApi
                .saveOrUpdateMessage(messageRequest);

        SendCouponDTO sendCouponDTO = new SendCouponDTO();
        sendCouponDTO.setSuccess(messageResult.getData().getSuccess());
        sendCouponDTO.setCouponName(sendCouponRequest.getCouponName());
        sendCouponDTO.setRule(sendCouponRequest.getCouponRule());

        // TODO 发放数量
        sendCouponDTO.setSendCount(0);

        return sendCouponDTO;
    }

    /**
     * 领取所有有效优惠券
     *
     * @param accountId
     */
    @Override
    public JsonResult<Boolean> receiveCouponAvailable(Long accountId) {
        String promotions = redisCache.get(RedisKey.PROMOTION_COUPON_KEY);
        List<SalesPromotionCouponDO> promotionCouponDOS = new ArrayList<>();
        if(StringUtils.isNotBlank(promotions)){
            promotionCouponDOS  = JSON.parseArray(promotions, SalesPromotionCouponDO.class);
        }
        log.info("缓存中的优惠券：{}",promotionCouponDOS.get(0));
        // 判断优惠是否过期，并清理过期优惠券
        promotionCouponDOS = validatePromotionEnd(promotionCouponDOS);

        log.info("当前有效的优惠券：{}",promotionCouponDOS);
        // 每次查询的数量
        Integer batchSize = 200;
        // 每次查询的起始位置
        Integer startIndex = 0;
        // 分批次从数据库中读取已保存的优惠券id
        List<Long> couponIds = new ArrayList<>(100);
        while(true){
            // 因为不需要整个优惠券数据对象，所以，自己定义一个分页查询
            List<Long> couponIdsLimits = salesPromotionCouponDAO
                    .queryAvailableCoupon(accountId, new Date(), startIndex, batchSize);
            if (couponIds.size() == 0){
                break;
            }
            couponIds.addAll(couponIdsLimits);
            startIndex += batchSize;
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                new BaseBizException("分批获取用户优惠券id异常");
            }
        }

        log.info("获取到的优惠券数据:{}",couponIds);
        // 循环判断用户是否已经领取过该优惠券了，如果领取过，直接返回，因为是循环操作数据库，
        // 所以我们对这个地方做一个优化，根据用户id，查询出所有其数据库中的有效优惠券id，
        // 在再内存里判断是否存在于可领取的优惠券中
        List<SalesPromotionCouponItemDO> filteredCouponItem = new ArrayList<>();
        for (SalesPromotionCouponDO promotionCouponDO : promotionCouponDOS){
            /*
            for 循环调用数据库判断是否领取过，不太合理
            LambdaQueryWrapper<SalesPromotionCouponItemDO> queryWrapper = Wrappers.lambdaQuery();
            queryWrapper.eq(SalesPromotionCouponItemDO::getCouponId, promotionCouponDOS);
            queryWrapper.eq(SalesPromotionCouponItemDO::getUserAccountId, accountId);
            int count = salesPromotionCouponItemDAO.count(queryWrapper);
             */
            // 用户已经领取过该优惠券
            if (couponIds.contains(promotionCouponDO.getId())){
                continue;
            }
            SalesPromotionCouponItemDO couponItemDO =
                    buildSalesPromotionCouponItemDO(promotionCouponDO, accountId);
            filteredCouponItem.add(couponItemDO);
        }

        // 批量保存可以领取的优惠券
        if(filteredCouponItem.size() > 0){
            log.info("领取到的优惠券：{}",filteredCouponItem);
            salesPromotionCouponItemDAO.saveBatch(filteredCouponItem);
        }
        return JsonResult.buildSuccess(true);
    }

    private SaveOrUpdateMessageRequest buildSaveOrUpdateMessageRequest(SendCouponRequest sendCouponRequest) {
        SaveOrUpdateMessageRequest request = SaveOrUpdateMessageRequest.builder()
                .pushType(sendCouponRequest.getPushType())
                .mainMessage("您有新的优惠券待领取")
                .informType(sendCouponRequest.getInformType())
                // 送您一张 满减/折扣 优惠券，请点击 http://www.ruyuan2020.com 领取
                .message("送您一张" + CouponTypeEnum.getByCode(sendCouponRequest.getCouponType()).getMsg()
                        + "优惠券，请点击 " + sendCouponRequest.getActiveUrl() + " 领取")
                .membershipFilterDTO(sendCouponRequest.getMembershipFilterDTO())
                .pushStartTime(sendCouponRequest.getPushStartTime())
                .pushEndTime(sendCouponRequest.getPushEndTime())
                .sendPeriodCount(sendCouponRequest.getSendPeriodCount())
                .createUser(sendCouponRequest.getCreateUser())
                .build();
        return request;
    }

    /**
     * 为所有用户发放优惠券
     * @param promotionCouponDO
     */
    private void sendPlatformCouponMessage(SalesPromotionCouponDO promotionCouponDO) {

        // 桶的大小，可以抽到参数里
        final int userBucketSize = 1000;
        final int messageBatchSize = 100;

        // 1、查询出库里面最大的userId，作为用户的总数量
        JsonResult<Long> maxUserIdJsonResult = accountApi.queryMaxUserId();
        if (maxUserIdJsonResult.getSuccess()) {
            throw new BaseBizException(maxUserIdJsonResult.getErrorCode(), maxUserIdJsonResult.getErrorMessage());
        }
        Long maxUserId = maxUserIdJsonResult.getData();

        // 2、分成m个桶，每个桶里面有n个用户，每个桶发送一条"批量发送优惠券用户桶消息"，
        // 例：maxUserId = 100w; userBucketSize=1000
        // userBucket1 = [1, 1001)
        // userBucket2 = [1001, 2001)
        // userBucketCount = 1000
        Map<Long, Long> userBuckets = new LinkedHashMap<>();
        AtomicBoolean flagRef = new AtomicBoolean(true);
        long startUserId = 1L;
        while (flagRef.get()) {
            if (startUserId > maxUserId) {
                flagRef.compareAndSet(true, false);
            }
            userBuckets.put(startUserId, startUserId + userBucketSize);
            startUserId += userBucketSize;
        }

        // 3、批量发送消息
        // 例：userBucketCount = 1000; messageBatchSize = 100
        // 批量发送次数 = 10次，经过两次分桶，这里发送消息的次数从100w次降到10次
        int handledBukectCount = 0;
        List<String> jsonMessageBatch = new ArrayList<>(messageBatchSize);
        for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {
            handledBukectCount++;
            PlatformCouponUserBucketMessage message = PlatformCouponUserBucketMessage.builder()
                    .startUserId(userBucket.getKey())
                    .endUserId(userBucket.getValue())
                    .informType(promotionCouponDO.getInformType())
                    .couponId(promotionCouponDO.getId())
                    .activityStartTime(promotionCouponDO.getActivityStartTime())
                    .activityEndTime(promotionCouponDO.getActivityEndTime())
                    .couponType(promotionCouponDO.getCouponType())
                    .build();
            String jsonMessage = JsonUtil.object2Json(message);
            jsonMessageBatch.add(jsonMessage);

            if (jsonMessageBatch.size() == messageBatchSize || handledBukectCount == userBuckets.size()) {
                defaultProducer.sendMessages(RocketMqConstant.PLATFORM_COUPON_SEND_USER_BUCKET_TOPIC, jsonMessageBatch, "平台发放优惠券用户桶消息");
                jsonMessageBatch.clear();
            }
        }
    }
}
